Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DynamoDB source plugin #3349

Merged
merged 8 commits into from
Oct 3, 2023
Merged

Conversation

daixba
Copy link
Contributor

@daixba daixba commented Sep 18, 2023

Description

Add support of DynamoDB as source.

Issues Resolved

Resolves #2932

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@daixba
Copy link
Contributor Author

daixba commented Sep 18, 2023

Submit the PR to receive early feedbacks. Still working on testing, will submit the UTs soon.

Comment on lines +32 to +35
coordinator:
dynamodb:
table_name: "coordinator-demo"
region: "us-west-2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make an issue to get the configuration for this coordinator moved to the data-prepper-config.yaml to be completed in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will do once this code is merged. I will create one to refer to this.

It looks like we can either expose a getCoordinationStore from current LeaseBasedCoordinator or by default only create coordinationStore from the data-prepper-config.yaml only instead of the coordinator. Not sure what is the best way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will want to use the data-prepper-config only. We can use the same flow that we currently use to provide plugins with a source coordinaor

Comment on lines 111 to 122
LOG.info("Gracefully shutdown DynamoDB schedulers");
executor.shutdownNow();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing against calling shutdownNow, but I don't think it's categorized as graceful shutdown since it forces the thread to quit immediately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I think this is confusing, actually, the scheduler will be interrupped immediately (which is OK), and the scheduler will implement logic to shutdown the real jobs (such as stream consumer) gracefully. I will remove this.

@Override
public void stop() {
LOG.info("Stop DynamoDB Source");
dynamoDBService.shutdown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should wrap this call in an if (Objects.nonNull(dynamoDBService) check. Not a big deal but NullPointer would happen if this gets run before line 66

Copy link
Contributor Author

@daixba daixba Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update.

* <li>Support multiple leases at the same time</li>
* </ul>
*/
public interface Coordinator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could call this EnhancedSourceCoordinator

// Do nothing
// The consumer must already do one last checkpointing and then release the lease.
LOG.debug("Shard consumer completed with exception");
LOG.error(ex.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's give more context to this error with a message saying where the exception has occurred


public class StreamConfig {

public static final String BEGINNING = "BEGINNING";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be "enum"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to use Enum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this back to String due to an exception that was raised in the new jackson lib (it was running OK previously) : Caused by: com.fasterxml.jackson.databind.JsonMappingException: Cannot construct instance of java.lang.Enum (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information.

Will try to fix this in later PRs.



@JsonProperty("coordinator")
private PluginModel coordinationStoreConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be injecting coordination configurations here. This duplicates configurations and creates additional confusion for customers.

Perhaps the ideal is to require that customers configure the necessary coordinator in data-prepper-config.yaml?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daixba daixba requested a review from dlvenable September 22, 2023 12:15
Copy link
Member

@graytaylor0 graytaylor0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized the build is failing due to unit tests in the dynamo source. Once these are resolved I am good to approve. Thanks!

Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
graytaylor0
graytaylor0 previously approved these changes Sep 26, 2023
Copy link
Collaborator

@kkondaka kkondaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daixba I do not see any code for e2e acknowledgement support? Will that be added as a separate PR?

data-prepper-plugins/dynamodb-source/build.gradle Outdated Show resolved Hide resolved
data-prepper-plugins/dynamodb-source/build.gradle Outdated Show resolved Hide resolved

@Override
public void start(Buffer<Record<Event>> buffer) {
LOG.info("Start Processing");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add something to this log line to help clarify what it starting. Perhaps "Start processing DynamoDB streams for {table} on {bucket}" or something similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to just 'Start DynamoDB service' for what it's actually doing, more log details will be in the service side.

return new Record<>(event);
}

public Record<Event> convertToEvent(Map<String, Object> data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally better design to split concerns such as writing to the buffer and converting record types. Perhaps this can be a follow-on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was called RecordProcessor which purpose is to convert the raw records to Jackson event and then write to buffer. Now it's renamed to RecordConverter but I didn't change the scope of this class.

/**
* Currently, this is to stop all consumers.
*/
public static void stopAll() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is static, it will stop all consumers in all pipelines. Use an instance per pipeline instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this has impacts to other pipelines, if they are running in separate containers or different nodes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data Prepper can run multiple parallel pipelines. If you have two pipelines with the dynamodb source they will share the same static shouldStop variable. Thus, when one pipeline shuts down, the other pipeline's DataFileLoader is effectively stopped. This may not be what the user wants.


private final AtomicInteger numOfWorkers = new AtomicInteger(0);

private static final int MAX_JOB_COUNT = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be tunable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do more performance test later. The observation is that the file read is too fast but the ingestion can't catch up.

}

@Override
public void run() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have this one thread which then creates two other threads. Can we eliminate this "scheduler" thread somehow? Perhaps just run two threads that each acquire a partition and then process it.

} else {
records = response.records();
}
recordConverter.writeToBuffer(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do both the file reader thread path and stream scheduler thread path write to the buffer? What is the difference in these outputs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are converted into the same Jackson events and then sent to the same buffer and eventually same destination.

Signed-off-by: Aiden Dai <[email protected]>
@daixba
Copy link
Contributor Author

daixba commented Oct 3, 2023

@daixba I do not see any code for e2e acknowledgement support? Will that be added as a separate PR?

It's not implemented yet (I don't know how it works). Will need a separate PR if needed.

Comment on lines +68 to +76
- `exportJobsSuccess`: measures total number of export jobs run with status completed.
- `exportJobsErrors`: measures total number of export jobs cannot be submitted or run with status failed.
- `exportFilesTotal`: measures total number of export files generated.
- `exportFilesSuccess`: measures total number of export files read (till the last line) successfully.
- `exportRecordsTotal`: measures total number of export records generated
- `exportRecordsSuccess`: measures total number of export records processed successfully .
- `exportRecordsErrors`: measures total number of export records processed failed
- `changeEventsSucceeded`: measures total number of changed events in total processed successfully
- `changeEventsFailed`: measures total number of changed events in total processed failed
Copy link
Member

@graytaylor0 graytaylor0 Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these metrics mainly cover export. Are there any metrics that we should add for streams? Things like number of shards processed?

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still some unresolved comments from the PR. I have left the conversations in the PR as unresolved. Let's follow up with another PR to address them.

@graytaylor0 graytaylor0 merged commit b69f81b into opensearch-project:main Oct 3, 2023
46 of 47 checks passed
@daixba daixba deleted the ddb-source branch October 20, 2023 02:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support DynamoDB as source
4 participants